package com.lagou.boot;

import com.lagou.client.RPCConsumer;
import com.lagou.service.IUserService;
import com.lagou.service.UserServiceImpl;
import io.netty.channel.EventLoopGroup;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class ConsumerBoot {

    private static RPCConsumer rpcConsumer;

    public static List<String>[] zkListener(CuratorFramework client, List<IUserService> serviceList, final List<String>[] childrenList) throws Exception {
        //注册对子节点的监听
        PathChildrenCache cache = new PathChildrenCache(client, "/netty", false);
        cache.start();
        PathChildrenCacheListener pccl = (curatorFramework, pathChildrenCacheEvent) -> {
            //清空list
            serviceList.clear();
            childrenList[0] = curatorFramework.getChildren().forPath("/netty");
            for (int i = 0; i < childrenList[0].size(); i++){
                int port = Integer.parseInt(childrenList[0].get(i));
                String host = new String(client.getData().forPath("/netty/" + childrenList[0].get(i)));
                //1.创建代理对象
                rpcConsumer = new RPCConsumer();
                IUserService service = (IUserService) rpcConsumer.createProxy(IUserService.class, host, port);
                serviceList.add(service);
            }
        };
        //注册监听
        cache.getListenable().addListener(pccl);
        return childrenList;
    }

    public static void main(String[] args) throws Exception {
        //连接服务端，从zookeeper获取连接信息
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString("127.0.0.1:2181")
                .sessionTimeoutMs(50000)
                .connectionTimeoutMs(30000)
                .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                .build();
        client.start();
        // 获取 /netty 下面的所有的子节点
        List<String>[] childrenList = new List[]{client.getChildren().forPath("/netty")};
        List<IUserService> serviceList = new ArrayList<>();
        // 循环遍历
        for (int i = 0; i < childrenList[0].size(); i++){
            int port = Integer.parseInt(childrenList[0].get(i));
            String host = new String(client.getData().forPath("/netty/" + childrenList[0].get(i)));
            //1.创建代理对象
            rpcConsumer = new RPCConsumer();
            IUserService service = (IUserService) rpcConsumer.createProxy(IUserService.class, host, port);
            serviceList.add(service);
        }

        childrenList = zkListener(client,serviceList,childrenList);

        //2.循环给服务器写数据
        while(true){
            for(int i=0; i<serviceList.size(); i++) {
                String result = serviceList.get(i).sayHello("快给我success！！");
                System.out.println(result);
                Thread.sleep(2000);
            }
        }

    }
}
